{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Regression -  Flight Delays with DataCleaning\n",
    "\n",
    "This example notebook is similar to\n",
    "[Regression - Flight Delays](https://github.com/microsoft/SynapseML/blob/master/notebooks/Regression%20-%20Flight%20Delays.ipynb).\n",
    "In this example, we will demonstrate the use of `DataConversion()` in two\n",
    "ways.  First, to convert the data type of several columns after the dataset\n",
    "has been read in to the Spark DataFrame instead of specifying the data types\n",
    "as the file is read in.  Second, to convert columns to categorical columns\n",
    "instead of iterating over the columns and applying the `StringIndexer`.\n",
    "\n",
    "This sample demonstrates how to use the following APIs:\n",
    "- [`TrainRegressor`\n",
    "  ](https://mmlspark.blob.core.windows.net/docs/1.0.15/pyspark/synapse.ml.train.html?#module-synapse.ml.train.TrainRegressor)\n",
    "- [`ComputePerInstanceStatistics`\n",
    "  ](https://mmlspark.blob.core.windows.net/docs/1.0.15/pyspark/synapse.ml.train.html?#module-synapse.ml.train.ComputePerInstanceStatistics)\n",
    "- [`DataConversion`\n",
    "  ](https://mmlspark.blob.core.windows.net/docs/1.0.15/pyspark/synapse.ml.featurize.html?#module-synapse.ml.featurize.DataConversion)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next, import the CSV dataset: retrieve the file if needed, save it locally,\n",
    "read the data into a pandas dataframe via `read_csv()`, then convert it to\n",
    "a Spark dataframe.\n",
    "\n",
    "Print the schema of the dataframe, and note the columns that are `long`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "flightDelay = spark.read.parquet(\n",
    "    \"wasbs://publicwasb@mmlspark.blob.core.windows.net/On_Time_Performance_2012_9.parquet\"\n",
    ")\n",
    "# print some basic info\n",
    "print(\"records read: \" + str(flightDelay.count()))\n",
    "print(\"Schema: \")\n",
    "flightDelay.printSchema()\n",
    "flightDelay.limit(10).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Use the `DataConversion` transform API to convert the columns listed to\n",
    "double.\n",
    "\n",
    "The `DataConversion` API accepts the following types for the `convertTo`\n",
    "parameter:\n",
    "* `boolean`\n",
    "* `byte`\n",
    "* `short`\n",
    "* `integer`\n",
    "* `long`\n",
    "* `float`\n",
    "* `double`\n",
    "* `string`\n",
    "* `toCategorical`\n",
    "* `clearCategorical`\n",
    "* `date` -- converts a string or long to a date of the format\n",
    "  \"yyyy-MM-dd HH:mm:ss\" unless another format is specified by\n",
    "the `dateTimeFormat` parameter.\n",
    "\n",
    "Again, print the schema and note that the columns are now `double`\n",
    "instead of long."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.featurize import DataConversion\n",
    "\n",
    "flightDelay = DataConversion(\n",
    "    cols=[\n",
    "        \"Quarter\",\n",
    "        \"Month\",\n",
    "        \"DayofMonth\",\n",
    "        \"DayOfWeek\",\n",
    "        \"OriginAirportID\",\n",
    "        \"DestAirportID\",\n",
    "        \"CRSDepTime\",\n",
    "        \"CRSArrTime\",\n",
    "    ],\n",
    "    convertTo=\"double\",\n",
    ").transform(flightDelay)\n",
    "flightDelay.printSchema()\n",
    "flightDelay.limit(10).toPandas()"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Split the dataset into train and test sets."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train, test = flightDelay.randomSplit([0.75, 0.25])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a regressor model and train it on the dataset.\n",
    "\n",
    "First, use `DataConversion` to convert the columns `Carrier`, `DepTimeBlk`,\n",
    "and `ArrTimeBlk` to categorical data.  Recall that in Notebook 102, this\n",
    "was accomplished by iterating over the columns and converting the strings\n",
    "to index values using the `StringIndexer` API.  The `DataConversion` API\n",
    "simplifies the task by allowing you to specify all columns that will have\n",
    "the same end type in a single command.\n",
    "\n",
    "Create a LinearRegression model using the Limited-memory BFGS solver\n",
    "(`l-bfgs`), an `ElasticNet` mixing parameter of `0.3`, and a `Regularization`\n",
    "of `0.1`.\n",
    "\n",
    "Train the model with the `TrainRegressor` API fit on the training dataset."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.train import TrainRegressor, TrainedRegressorModel\n",
    "from pyspark.ml.regression import LinearRegression\n",
    "\n",
    "trainCat = DataConversion(\n",
    "    cols=[\"Carrier\", \"DepTimeBlk\", \"ArrTimeBlk\"], convertTo=\"toCategorical\"\n",
    ").transform(train)\n",
    "testCat = DataConversion(\n",
    "    cols=[\"Carrier\", \"DepTimeBlk\", \"ArrTimeBlk\"], convertTo=\"toCategorical\"\n",
    ").transform(test)\n",
    "lr = LinearRegression().setRegParam(0.1).setElasticNetParam(0.3)\n",
    "model = TrainRegressor(model=lr, labelCol=\"ArrDelay\").fit(trainCat)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Score the regressor on the test data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "scoredData = model.transform(testCat)\n",
    "scoredData.limit(10).toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Compute model metrics against the entire scored dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.train import ComputeModelStatistics\n",
    "\n",
    "metrics = ComputeModelStatistics().transform(scoredData)\n",
    "metrics.toPandas()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, compute and show statistics on individual predictions in the test\n",
    "dataset, demonstrating the usage of `ComputePerInstanceStatistics`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from synapse.ml.train import ComputePerInstanceStatistics\n",
    "\n",
    "evalPerInstance = ComputePerInstanceStatistics().transform(scoredData)\n",
    "evalPerInstance.select(\"ArrDelay\", \"prediction\", \"L1_loss\", \"L2_loss\").limit(\n",
    "    10\n",
    ").toPandas()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [default]",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
